/**
 * Copyright (c) 2017-present, Facebook, Inc. and its affiliates.
 * All rights reserved.
 *
 * This source code is licensed under the BSD-style license found in the
 * LICENSE file in the root directory of this source tree.
 */
#pragma once

#include <unordered_map>

#include <folly/IntrusiveList.h>

#include "logdevice/common/AuthoritativeStatus.h"
#include "logdevice/common/Request.h"
#include "logdevice/common/ShardID.h"
#include "logdevice/common/types_internal.h"

namespace facebook { namespace logdevice {

class ProtocolReader;
class ProtocolWriter;

/**
 * Base abstract class for subscribers of shard authoritative status changes.
 * onShardStatusChanged() callback will be triggered once authoritative status
 * of some shards got updated through UpdateShardAuthoritativeMapRequest.
 */
class ShardAuthoritativeStatusSubscriber {
 public:
  ShardAuthoritativeStatusSubscriber() = default;

  ShardAuthoritativeStatusSubscriber(
      const ShardAuthoritativeStatusSubscriber&) = delete;

  const ShardAuthoritativeStatusSubscriber&
  operator=(const ShardAuthoritativeStatusSubscriber&) = delete;

  /**
   * Register subscriber to shard authoritativeness update through per worker
   * ShardAuthoritativeStatusManager.
   * Should be invoked once worker is constructed. For example,
   * LogRecoveryRequest register for updates in execute() rather than in
   * construction time when worker may be not ready yet.
   */
  virtual void registerForShardAuthoritativeStatusUpdates();

  /**
   * Callback which is going to be invoked once shard authoritative status
   * is updated.
   */
  virtual void onShardStatusChanged() = 0;

  virtual ~ShardAuthoritativeStatusSubscriber() {}

  // hook for ShardAuthoritativeStatusMap::subscribers_
  folly::IntrusiveListHook listHook_;
};

/**
 * Store the authoritative state of all the shards in the cluster.
 * @see AuthoritativeStatus.h for a description of the different states a shard
 * can be in.
 *
 * This state is used by state machines that require an f-majority on a nodeset
 * and need to know of the state of all shards in the cluster in order to
 * discard nodes that are in state AUTHORITATIVE_EMPTY from the nodeset or
 * detect that they can only have a non-authoritative majority as too many nodes
 * are in the state UNDERREPLICATION. Such state machines are Recovery,
 * ClientReadStream (thus purging), etc.
 *
 * This object is generated by storage nodes when they receive a new event from
 * the event log and then serialized into a SHARD_STATUS_UPDATE_Message to be
 * sent by all clients that run a reader.
 */
class ShardAuthoritativeStatusMap {
 public:
  struct ShardState {
    AuthoritativeStatus auth_status;
    // Nodes performing a time-ranged rebuild are FULLY_AUTHORITATIVE.
    // On server nodes, we note when a shard is time-range rebuilding in the
    // ShardAuthoritativeStatusMap so that we can return E::REBUILDING
    // status to START messages for old clients that do not understand
    // the UNDER_REPLICATED gap type.
    // This field is not serialized.
    bool time_ranged_rebuild;

    bool operator==(const ShardState& rhs) const {
      return auth_status == rhs.auth_status &&
          time_ranged_rebuild == rhs.time_ranged_rebuild;
    }
  };
  using NodeState = std::unordered_map<uint32_t, ShardState>;

  ShardAuthoritativeStatusMap() : version_(LSN_INVALID) {}

  explicit ShardAuthoritativeStatusMap(lsn_t version) : version_(version) {}

  /**
   * @return version of this object. This is actually the LSN of the last event
   * log record that was considered when building this object.
   */
  lsn_t getVersion() const {
    return version_;
  }

  AuthoritativeStatus getShardStatus(node_index_t node, uint32_t shard) const;

  AuthoritativeStatus getShardStatus(const ShardID& shard_id) const;

  bool shardIsTimeRangeRebuilding(node_index_t node, uint32_t shard) const;

  void setShardStatus(node_index_t node,
                      uint32_t shard,
                      AuthoritativeStatus status,
                      bool time_ranged_rebuild = false);

  /**
   * Serialize the shard status map for transfer over the network.
   */
  void serialize(ProtocolWriter&) const;

  /**
   * Fill in the shard status map by deserializing data from the network.
   */
  void deserialize(ProtocolReader&);

  /**
   * @return concise dump of the status of each shard. Used for logging and
   * debugging.
   */
  std::string describe() const;

  /**
   * @return a map from a (node_id, shard_id) to authoritative status. Any shard
   * not present in the map is fully authoritative.
   */
  const std::unordered_map<node_index_t, NodeState>& getShards() const {
    return shards_;
  }

  /**
   * @return True if this and other contain the same authoritative status for
   * all shards. This function does not compare versions.
   */
  bool operator==(const ShardAuthoritativeStatusMap& other) const;
  bool operator!=(const ShardAuthoritativeStatusMap& other) const;

  // Entry to serialize in a SHARD_STATUS_UPDATE_Message.
  struct SerializedEntry {
    node_index_t node;
    uint32_t shard;
    AuthoritativeStatus status;
  } __attribute__((__packed__));

 private:
  lsn_t version_;
  // We actually only store an entry for shards whose AuthoritativeStatus !=
  // FULLY_AUTHORITATIVE.
  std::unordered_map<node_index_t, NodeState> shards_;
};

/**
 * A request to update Worker::shardStatusManager()::shard_status_map_ on all
 * workers.
 * If this is a storage node, this request is created when EventLogStateMachine
 * updates its rebuilding set. If this is a client, this request is created when
 * we receive a SHARD_STATUS_UPDATE_Message.
 */
class UpdateShardAuthoritativeMapRequest : public Request {
 public:
  UpdateShardAuthoritativeMapRequest(ShardAuthoritativeStatusMap map,
                                     int worker,
                                     WorkerType worker_type)
      : Request(RequestType::UPDATE_SHARD_STATUS),
        map_(std::move(map)),
        worker_(worker),
        worker_type_(worker_type) {}

  int getThreadAffinity(int) override {
    return worker_;
  }
  WorkerType getWorkerTypeAffinity() override {
    return worker_type_;
  }

  Request::Execution execute() override;

  static void broadcastToAllWorkers(const ShardAuthoritativeStatusMap& map);

 private:
  ShardAuthoritativeStatusMap map_;
  int worker_;
  WorkerType worker_type_;
};

/**
 * Wrapper around ShardAuthoritativeStatusMap and its updates subscribers.
 * Worker owns this object to keep track of latest ShardAuthoritativeStatusMap
 * and subscribers list.
 * UpdateShardAuthoritativeMapRequest::notifyInterestedParties() traverses
 * subscribers list and invokes onShardStatusChanged() callback on each of them.
 */
class ShardAuthoritativeStatusManager {
 public:
  ShardAuthoritativeStatusMap& getShardAuthoritativeStatusMap() {
    return shard_status_map_;
  }

  void subscribe(ShardAuthoritativeStatusSubscriber& subscriber) {
    subscribers_.push_back(subscriber);
  }

  void notifySubscribers();

 private:
  folly::IntrusiveList<ShardAuthoritativeStatusSubscriber,
                       &ShardAuthoritativeStatusSubscriber::listHook_>
      subscribers_;

  ShardAuthoritativeStatusMap shard_status_map_;
};

}} // namespace facebook::logdevice
